package com.snda.storage.service;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.snda.storage.exceptions.CSServiceException;
import com.snda.storage.exceptions.ServiceException;
import com.snda.storage.security.ProviderCredentials;
import com.snda.storage.service.impl.rest.httpclient.RestStorageService;
import com.snda.storage.service.model.CSBucket;
import com.snda.storage.service.model.CSObject;
import com.snda.storage.service.model.MultipartCompleted;
import com.snda.storage.service.model.MultipartPart;
import com.snda.storage.service.model.MultipartUpload;
import com.snda.storage.service.model.StorageBucket;
import com.snda.storage.service.model.StorageObject;
import com.snda.storage.service.utils.MultipartUtils;
import com.snda.storage.service.utils.RestUtils;
import com.snda.storage.service.utils.ServiceUtils;

/**
 * A service that handles communication with Cloud Storage, offering all the operations that can be performed
 * on SNDA accounts.
 *
 */
public abstract class CSService extends RestStorageService {
	
	private static final Logger log = LoggerFactory.getLogger(CSService.class);	

	/**
     * Construct an <code>CSService</code> identified by the given user credentials.
     *
     * @param credentials
     * the SNDA user credentials to use when communicating with Cloud Storage, may be null in which case the
     * communication is done as an anonymous user.
     */
	public CSService(ProviderCredentials credentials) {
		super(credentials);
	}
	
	public ProviderCredentials getSNDACredentials() {
		return credentials;
	}
	
	/**
     * Generates a signed URL string that will grant access to an CS resource (bucket or object)
     * to whoever uses the URL up until the time specified.
     *
     * @param method
     * the HTTP method to sign, such as GET or PUT.
     * @param bucketName
     * the name of the bucket to include in the URL, must be a valid bucket name.
     * @param objectKey
     * the name of the object to include in the URL, if null only the bucket name is used.
     * @param specialParamName
     * the name of a request parameter to add to the URL generated by this method. 'Special'
     * parameters may include parameters that specify the kind of CS resource that the URL
     * will refer to, such as 'logging', or 'location'.
     * @param headersMap
     * headers to add to the signed URL, may be null.
     * Headers that <b>must</b> match between the signed URL and the actual request include:
     * content-md5, content-type, and any header starting with 'x-snda-'.
     * @param secondsSinceEpoch
     * the time after which URL's signature will no longer be valid. This time cannot be null.
     *  <b>Note:</b> This time is specified in seconds since the epoch, not milliseconds.
     * @param isVirtualHost
     * if this parameter is true, the bucket name is treated as a virtual host name. To use
     * this option, the bucket name must be a valid DNS name that is an alias to an CS bucket.
     * @param isHttps
     * if true, the signed URL will use the HTTPS protocol. If false, the signed URL will
     * use the HTTP protocol.
     * @param isDnsBucketNamingDisabled
     * if true, the signed URL will not use the DNS-name format for buckets eg.
     *
     * @return
     * a URL signed in such a way as to grant access to an CS resource to whoever uses it.
     *
     */
	public String createSignedUrl(String method, String bucketName, String objectKey,
	        String specialParamName, Map<String, Object> headersMap, long secondsSinceEpoch,
	        boolean isVirtualHost, boolean isHttps, boolean isDnsBucketNamingDisabled) {
		try {
            String csEndpoint = this.getSignedUrlEndpoint();
            String uriPath = "";

            String hostname = (isVirtualHost
                ? bucketName
                : ServiceUtils.generateCSHostnameForBucket(
                    bucketName, isDnsBucketNamingDisabled, csEndpoint));

            if (headersMap == null) {
                headersMap = new HashMap<String, Object>();
            }

            // If we are using an alternative hostname, include the hostname/bucketname in the resource path.
            String virtualBucketPath = "";
            if (!csEndpoint.equals(hostname)) {
                int subdomainOffset = hostname.lastIndexOf("." + csEndpoint);
                if (subdomainOffset > 0) {
                    // Hostname represents an CS sub-domain, so the bucket's name is the CNAME portion
                    virtualBucketPath = hostname.substring(0, subdomainOffset) + "/";
                } else {
                    // Hostname represents a virtual host, so the bucket's name is identical to hostname
                    virtualBucketPath = hostname + "/";
                }
                uriPath = (objectKey != null ? RestUtils.encodeUrlPath(objectKey, "/") : "");
            } else {
                uriPath = bucketName + (objectKey != null ? "/" + RestUtils.encodeUrlPath(objectKey, "/") : "");
            }

            if (specialParamName != null) {
                uriPath += "?" + specialParamName + "&";
            } else {
                uriPath += "?";
            }

            uriPath += "SNDAAccessKeyId=" + credentials.getAccessKey();
            uriPath += "&Expires=" + secondsSinceEpoch;

            // Include Requester Pays header flag, if the flag is included as a request parameter.
            if (specialParamName != null
                && specialParamName.toLowerCase().indexOf(Constants.REQUESTER_PAYS_BUCKET_FLAG) >= 0)
            {
                String[] requesterPaysHeaderAndValue = Constants.REQUESTER_PAYS_BUCKET_FLAG.split("=");
                headersMap.put(requesterPaysHeaderAndValue[0], requesterPaysHeaderAndValue[1]);
            }

            String serviceEndpointVirtualPath = this.getVirtualPath();

            String canonicalString = RestUtils.makeServiceCanonicalString(method,
                serviceEndpointVirtualPath + "/" + virtualBucketPath + uriPath,
                renameMetadataKeys(headersMap), String.valueOf(secondsSinceEpoch),
                this.getRestHeaderPrefix(), this.getResourceParameterNames());
            if (log.isDebugEnabled()) {
                log.debug("Signing canonical string:\n" + canonicalString);
            }

            String signedCanonical = ServiceUtils.signWithHmacSha1(credentials.getSecretKey(),
                canonicalString);
            String encodedCanonical = RestUtils.encodeUrlString(signedCanonical);
            uriPath += "&Signature=" + encodedCanonical;

            if (isHttps) {
                int httpsPort = this.getHttpsPort();
                return "https://" + hostname
                    + (httpsPort != 443 ? ":" + httpsPort : "")
                    + serviceEndpointVirtualPath
                    + "/" + uriPath;
            } else {
                int httpPort = this.getHttpPort();
                return "http://" + hostname
                + (httpPort != 80 ? ":" + httpPort : "")
                + serviceEndpointVirtualPath
                + "/" + uriPath;
            }
        } catch (ServiceException se) {
            throw new CSServiceException(se);
        } catch (UnsupportedEncodingException e) {
            throw new CSServiceException(e);
        }
	}
	
	/**
     * Generates a signed URL string that will grant access to an CS resource (bucket or object)
     * to whoever uses the URL up until the time specified.
     *
     * @param method
     * the HTTP method to sign, such as GET or PUT.
     * @param bucketName
     * the name of the bucket to include in the URL, must be a valid bucket name.
     * @param objectKey
     * the name of the object to include in the URL, if null only the bucket name is used.
     * @param specialParamName
     * the name of a request parameter to add to the URL generated by this method. 'Special'
     * parameters may include parameters that specify the kind of CS resource that the URL
     * will refer to, such as 'logging' or 'location'.
     * @param headersMap
     * headers to add to the signed URL, may be null.
     * Headers that <b>must</b> match between the signed URL and the actual request include:
     * content-md5, content-type, and any header starting with 'x-snda-'.
     * @param secondsSinceEpoch
     * the time after which URL's signature will no longer be valid. This time cannot be null.
     *  <b>Note:</b> This time is specified in seconds since the epoch, not milliseconds.
     * @param isVirtualHost
     * if this parameter is true, the bucket name is treated as a virtual host name. To use
     * this option, the bucket name must be a valid DNS name that is an alias to an CS bucket.
     *
     * @return
     * a URL signed in such a way as to grant access to an CS resource to whoever uses it.
     *
     */
	public String createSignedUrl(String method, String bucketName,
			String objectKey, String specialParamName,
			Map<String, Object> headersMap, long secondsSinceEpoch,
			boolean isVirtualHost) {
		boolean isHttps = this.isHttpsOnly();
		boolean disableDnsBuckets = this.getDisableDnsBuckets();

		return createSignedUrl(method, bucketName, objectKey, specialParamName,
				headersMap, secondsSinceEpoch, isVirtualHost, isHttps,
				disableDnsBuckets);
	}
	
	/**
     * Generates a signed URL string that will grant access to an CS resource (bucket or object)
     * to whoever uses the URL up until the time specified.
     *
     * @param method
     * the HTTP method to sign, such as GET or PUT.
     * @param bucketName
     * the name of the bucket to include in the URL, must be a valid bucket name.
     * @param objectKey
     * the name of the object to include in the URL, if null only the bucket name is used.
     * @param specialParamName
     * the name of a request parameter to add to the URL generated by this method. 'Special'
     * parameters may include parameters that specify the kind of CS resource that the URL
     * will refer to, such as 'logging' or 'location'.
     * @param headersMap
     * headers to add to the signed URL, may be null.
     * Headers that <b>must</b> match between the signed URL and the actual request include:
     * content-md5, content-type, and any header starting with 'x-snda-'.
     * @param secondsSinceEpoch
     * the time after which URL's signature will no longer be valid. This time cannot be null.
     *  <b>Note:</b> This time is specified in seconds since the epoch, not milliseconds.
     *
     * @return
     * a URL signed in such a way as to grant access to an CS resource to whoever uses it.
     *
     */
    public String createSignedUrl(String method, String bucketName, String objectKey,
        String specialParamName, Map<String, Object> headersMap, long secondsSinceEpoch) {
        return createSignedUrl(method, bucketName, objectKey, specialParamName, headersMap,
            secondsSinceEpoch, false);
    }
	
    /**
     * Generates a signed GET URL.
     *
     * @param bucketName
     * the name of the bucket to include in the URL, must be a valid bucket name.
     * @param objectKey
     * the name of the object to include in the URL, if null only the bucket name is used.
     * @param expiryTime
     * the time after which URL's signature will no longer be valid. This time cannot be null.
     * @param isVirtualHost
     * if this parameter is true, the bucket name is treated as a virtual host name. To use
     * this option, the bucket name must be a valid DNS name that is an alias to an CS bucket.
     *
     * @return
     * a URL signed in such a way as to grant GET access to an CS resource to whoever uses it.
     */
	public String createSignedGetUrl(String bucketName, String objectKey,
			Date expiryTime, boolean isVirtualHost) {
		long secondsSinceEpoch = expiryTime.getTime() / 1000;
		return createSignedUrl("GET", bucketName, objectKey, null, null,
				secondsSinceEpoch, isVirtualHost);
	}
	
	/**
     * Generates a signed GET URL.
     *
     * @param bucketName
     * the name of the bucket to include in the URL, must be a valid bucket name.
     * @param objectKey
     * the name of the object to include in the URL, if null only the bucket name is used.
     * @param expiryTime
     * the time after which URL's signature will no longer be valid. This time cannot be null.
     *
     * @return
     * a URL signed in such a way as to grant GET access to an CS resource to whoever uses it.
     */
	public String createSignedGetUrl(String bucketName, String objectKey,
	        Date expiryTime) {
		return createSignedGetUrl(bucketName, objectKey, expiryTime, false);
	}
	
	/**
     * Generates a signed PUT URL.
     *
     * @param bucketName
     * the name of the bucket to include in the URL, must be a valid bucket name.
     * @param objectKey
     * the name of the object to include in the URL, if null only the bucket name is used.
     * @param headersMap
     * headers to add to the signed URL, may be null.
     * Headers that <b>must</b> match between the signed URL and the actual request include:
     * content-md5, content-type, and any header starting with 'x-snda-'.
     * @param expiryTime
     * the time after which URL's signature will no longer be valid. This time cannot be null.
     * @param isVirtualHost
     * if this parameter is true, the bucket name is treated as a virtual host name. To use
     * this option, the bucket name must be a valid DNS name that is an alias to an CS bucket.
     *
     * @return
     * a URL signed in such a way as to allow anyone to PUT an object into CS.
     */
    public String createSignedPutUrl(String bucketName, String objectKey,
        Map<String, Object> headersMap, Date expiryTime, boolean isVirtualHost) {
        long secondsSinceEpoch = expiryTime.getTime() / 1000;
        return createSignedUrl("PUT", bucketName, objectKey, null, headersMap,
            secondsSinceEpoch, isVirtualHost);
    }


    /**
     * Generates a signed PUT URL.
     *
     * @param bucketName
     * the name of the bucket to include in the URL, must be a valid bucket name.
     * @param objectKey
     * the name of the object to include in the URL, if null only the bucket name is used.
     * @param headersMap
     * headers to add to the signed URL, may be null.
     * Headers that <b>must</b> match between the signed URL and the actual request include:
     * content-md5, content-type, and any header starting with 'x-snda-'.
     * @param expiryTime
     * the time after which URL's signature will no longer be valid. This time cannot be null.
     *
     * @return
     * a URL signed in such a way as to allow anyone to PUT an object into CS.
     */
    public String createSignedPutUrl(String bucketName, String objectKey,
        Map<String, Object> headersMap, Date expiryTime) {
        return createSignedPutUrl(bucketName, objectKey, headersMap, expiryTime, false);
    }


    /**
     * Generates a signed DELETE URL.
     *
     * @param bucketName
     * the name of the bucket to include in the URL, must be a valid bucket name.
     * @param objectKey
     * the name of the object to include in the URL, if null only the bucket name is used.
     * @param expiryTime
     * the time after which URL's signature will no longer be valid. This time cannot be null.
     * @param isVirtualHost
     * if this parameter is true, the bucket name is treated as a virtual host name. To use
     * this option, the bucket name must be a valid DNS name that is an alias to an CS bucket.
     *
     * @return
     * a URL signed in such a way as to allow anyone do DELETE an object in CS.
     */
    public String createSignedDeleteUrl(String bucketName, String objectKey,
        Date expiryTime, boolean isVirtualHost) {
        long secondsSinceEpoch = expiryTime.getTime() / 1000;
        return createSignedUrl("DELETE", bucketName, objectKey, null, null,
            secondsSinceEpoch, isVirtualHost);
    }


    /**
     * Generates a signed DELETE URL.
     *
     * @param bucketName
     * the name of the bucket to include in the URL, must be a valid bucket name.
     * @param objectKey
     * the name of the object to include in the URL, if null only the bucket name is used.
     * @param expiryTime
     * the time after which URL's signature will no longer be valid. This time cannot be null.
     *
     * @return
     * a URL signed in such a way as to allow anyone do DELETE an object in CS.
     */
    public String createSignedDeleteUrl(String bucketName, String objectKey,
        Date expiryTime) {
        return createSignedDeleteUrl(bucketName, objectKey, expiryTime, false);
    }


    /**
     * Generates a signed HEAD URL.
     *
     * @param bucketName
     * the name of the bucket to include in the URL, must be a valid bucket name.
     * @param objectKey
     * the name of the object to include in the URL, if null only the bucket name is used.
     * @param expiryTime
     * the time after which URL's signature will no longer be valid. This time cannot be null.
     * @param isVirtualHost
     * if this parameter is true, the bucket name is treated as a virtual host name. To use
     * this option, the bucket name must be a valid DNS name that is an alias to an CS bucket.
     *
     * @return
     * a URL signed in such a way as to grant HEAD access to an CS resource to whoever uses it.
     */
    public String createSignedHeadUrl(String bucketName, String objectKey,
        Date expiryTime, boolean isVirtualHost) {
        long secondsSinceEpoch = expiryTime.getTime() / 1000;
        return createSignedUrl("HEAD", bucketName, objectKey, null, null,
            secondsSinceEpoch, isVirtualHost);
    }


    /**
     * Generates a signed HEAD URL.
     *
     * @param bucketName
     * the name of the bucket to include in the URL, must be a valid bucket name.
     * @param objectKey
     * the name of the object to include in the URL, if null only the bucket name is used.
     * @param expiryTime
     * the time after which URL's signature will no longer be valid. This time cannot be null.
     *
     * @return
     * a URL signed in such a way as to grant HEAD access to an CS resource to whoever uses it.
     */
    public String createSignedHeadUrl(String bucketName, String objectKey,
        Date expiryTime) {
        return createSignedHeadUrl(bucketName, objectKey, expiryTime, false);
    }

	@Override
	public CSBucket[] listAllBuckets() {
		try {
			StorageBucket[] buckets = super.listAllBuckets();
			return CSBucket.cast(buckets);
		} catch (ServiceException e) {
			throw new CSServiceException(e);
		}
	}

	@Override
	public CSObject getObject(String bucketName, String objectKey) {
		try {
			return (CSObject)super.getObject(bucketName, objectKey);
		} catch (ServiceException e) {
			throw new CSServiceException(e);
		}
	}
	
	@Override
	public CSObject headObject(String bucketName, String objectKey) {
		try {
			return (CSObject)super.headObject(bucketName, objectKey);
		} catch (ServiceException e) {
			throw new CSServiceException(e);
		}
	}
	
	@Override
    public CSObject[] listObjects(String bucketName) {
        try {
            return CSObject.cast(super.listObjects(bucketName));
        } catch (ServiceException e) {
            throw new CSServiceException(e);
        }
    }
	
	@Override
    public CSObject[] listObjects(String bucketName, String prefix,
        String delimiter, long maxListingLength) {
        try {
            return CSObject.cast(
            		super.listObjectsChunked(bucketName, prefix, 
            				delimiter, maxListingLength, null, false).getObjects());
        } catch (ServiceException e) {
            throw new CSServiceException(e);
        }
    }
	
	@Override
    public CSObject[] listObjects(String bucketName, String prefix, String delimiter) {
        try {
            return CSObject.cast(super.listObjects(bucketName, prefix, delimiter));
        } catch (ServiceException e) {
            throw new CSServiceException(e);
        }
    }
	
	@Override
	public CSObject[] listObjects(String bucketName, String prefix, String delimiter,
			String marker, long maxListingLength) {
		try {
            return CSObject.cast(super.listObjectsChunked(bucketName, prefix, delimiter, maxListingLength, marker, false).getObjects());
        } catch (ServiceException e) {
            throw new CSServiceException(e);
        }
	}
	
	@Override
    public CSBucket createBucket(String bucketName) {
        try {
            return this.createBucket(new CSBucket(bucketName, Constants.CS_DEFAULT_LOCATION));
        } catch (ServiceException e) {
            throw new CSServiceException(e);
        }
    }

    @Override
    public CSBucket getOrCreateBucket(String bucketName) {
        try {
            return this.getOrCreateBucket(bucketName, "wuxi");
        } catch (ServiceException e) {
            throw new CSServiceException(e);
        }
    }
    
    public CSBucket createBucket(CSBucket bucket) {
        try {
            assertAuthenticatedConnection("Create Bucket");
            assertValidBucket(bucket, "Create Bucket");
            return (CSBucket) createBucketImpl(bucket.getName(), bucket.getLocation());
        } catch (ServiceException e) {
            throw new CSServiceException(e);
        }
    }
    
    @Override
    public CSBucket getBucket(String bucketName) {
        try {
            return (CSBucket) super.getBucket(bucketName);
        } catch (ServiceException e) {
            throw new CSServiceException(e);
        }
    }
    
    /**
     * Returns a bucket in your SNDA account, and creates the bucket in the given Cloud Storage location
     * if it does not yet exist.
     * <p>
     * Note: This method will not change the location of an existing bucket if you specify
     * a different location from a bucket's current location. To move a bucket between
     * locations you must first delete it in the original location, then re-create it
     * in the new location.
     *
     * @param bucketName
     * the name of the bucket to retrieve or create.
     * @param location
     * the location of the Cloud Storage data centre in which the bucket will be created.
     * @return
     * the bucket in your account.
     */
	public CSBucket getOrCreateBucket(String bucketName, String location) {
		try {
			assertAuthenticatedConnection("Get or Create Bucket with location");
		} catch (ServiceException e) {
			throw new CSServiceException(e);
		}

		CSBucket bucket = getBucket(bucketName);
		if (bucket == null) {
			// Bucket does not exist in this user's account, create it.
			bucket = createBucket(new CSBucket(bucketName, location));
		}
		return bucket;
	}
	
	/**
     * Puts an object inside an existing bucket in Cloud Storage, creating a new object or overwriting
     * an existing one with the same key.
     * <p>
     * This method can be performed by anonymous services. Anonymous services
     * can put objects into a publicly-writable bucket.
     *
     * @param bucketName
     * the name of the bucket inside which the object will be put.
     * @param object
     * the object containing all information that will be written to Cloud Storage. At very least this object must
     * be valid. Beyond that it may contain: an input stream with the object's data content, metadata,
     * and access control settings.<p>
     * <b>Note:</b> It is very important to set the object's Content-Length to match the size of the
     * data input stream when possible, as this can remove the need to read data into memory to
     * determine its size.
     *
     * @return
     * the object populated with any metadata information made available by Cloud Storage.
     */
    public CSObject putObject(String bucketName, CSObject object) {
        try {
            return (CSObject) super.putObject(bucketName, object);
        } catch (ServiceException e) {
            throw new CSServiceException(e);
        }
    }
    
    /**
     * Puts an object inside an existing bucket in Cloud Storage, creating a new object or overwriting
     * an existing one with the same key.
     * <p>
     * This method can be performed by anonymous services. Anonymous services
     * can put objects into a publicly-writable bucket.
     *
     * @param bucket
     * the bucket inside which the object will be put, which must be valid.
     * @param object
     * the object containing all information that will be written to Cloud Storage. At very least this object must
     * be valid. Beyond that it may contain: an input stream with the object's data content, metadata,
     * and access control settings.<p>
     * <b>Note:</b> It is very important to set the object's Content-Length to match the size of the
     * data input stream when possible, as this can remove the need to read data into memory to
     * determine its size.
     *
     * @return
     * the object populated with any metadata information made available by Cloud Storage.
     */
    public CSObject putObject(CSBucket bucket, CSObject object) {
        try {
            assertValidBucket(bucket, "Create Object in bucket");
            return putObject(bucket.getName(), object);
        } catch (ServiceException e) {
            throw new CSServiceException(e);
        }
    }
    
    /**
     * Deletes an object from a bucket in Cloud Storage.
     * <p>
     * This method can be performed by anonymous services. Anonymous services
     * can delete objects from publicly-writable buckets.
     *
     * @param bucket
     * the bucket containing the object to be deleted.
     * @param objectKey
     * the key representing the object in Cloud Storage.
     */
    public void deleteObject(CSBucket bucket, String objectKey) {
        try {
            assertValidBucket(bucket, "deleteObject");
            assertValidObject(objectKey, "deleteObject");
            deleteObject(bucket.getName(), objectKey);
        } catch (ServiceException e) {
            throw new CSServiceException(e);
        }
    }
    
    /**
     * Returns an object representing the details of an item in Cloud Storage that meets any given preconditions.
     * The object is returned without the object's data.
     * <p>
     * An exception is thrown if any of the preconditions fail.
     * Preconditions are only applied if they are non-null.
     * <p>
     * This method can be performed by anonymous services. Anonymous services
     * can get details of publicly-readable objects.
     *
     * @param bucket
     * the bucket containing the object.
     * This must be a valid CSBucket object that is non-null and contains a name.
     * @param objectKey
     * the key identifying the object.
     * @return
     * the object with the given key in Cloud Storage, including only general details and metadata (not the data
     * input stream)
     */
    public CSObject headObject(CSBucket bucket, String objectKey) {
        try {
            assertValidBucket(bucket, "Get Object Details");
            return (CSObject) headObjectImpl(bucket.getName(), objectKey);
        } catch (ServiceException e) {
            throw new CSServiceException(e);
        }
    }
	
    /**
     * Returns an object representing the details and data of an item in Cloud Storage that meets any given preconditions.
     * <p>
     * <b>Important:</b> It is the caller's responsibility to close the object's data input stream.
     * The data stream should be consumed and closed as soon as is practical as network connections
     * may be held open until the streams are closed. Excessive unclosed streams can lead to
     * connection starvation.
     * <p>
     * An exception is thrown if any of the preconditions fail.
     * Preconditions are only applied if they are non-null.
     * <p>
     * This method can be performed by anonymous services. Anonymous services
     * can get publicly-readable objects.
     * <p>
     * <b>Implementation notes</b><p>
     * Implementations should use {@link #assertValidBucket} assertion.
     *
     * @param bucket
     * the bucket containing the object.
     * This must be a valid CSBucket object that is non-null and contains a name.
     * @param objectKey
     * the key identifying the object.
     * @param ifMatchTags
     * a precondition specifying an MD5 hash the object must match, ignored if null.
     * @param byteRangeStart
     * include only a portion of the object's data - starting at this point, ignored if null.
     * @param byteRangeEnd
     * include only a portion of the object's data - ending at this point, ignored if null.
     * @return
     * the object with the given key in Cloud Storage, including only general details and metadata (not the data
     * input stream)
     */
    public CSObject getObject(CSBucket bucket, String objectKey, 
    		String[] ifMatchTags, Long byteRangeStart, Long byteRangeEnd) {
        try {
            assertValidBucket(bucket, "Get Object");
            return (CSObject) getObjectImpl(bucket.getName(), objectKey, 
            		ifMatchTags, byteRangeStart, byteRangeEnd);
        } catch (ServiceException e) {
            throw new CSServiceException(e);
        }
    }
    
    /**
     * Retrieves the location of a bucket. Only the owner of a bucket may retrieve its location.
     *
     * @param bucketName
     * the name of the bucket whose location will be returned.
     * @return
     * a string representing the location of the bucket, such as "wuxi" for a bucket
     * located in null for a bucket in the default wuxi location.
     */
    public String getBucketLocation(String bucketName) {
        return getBucketLocationImpl(bucketName);
    }
    
    /**
     * Apply a JSON access control policy document to a bucket.
     *
     * @param bucketName
     * @param policyDocument
     */
    public void setBucketPolicy(String bucketName, String policyDocument) {
        setBucketPolicyImpl(bucketName, policyDocument);
    }
    
    /**
     * Retrieve the JSON access control policy document for a bucket,
     * or null if the bucket does not have a policy.
     *
     * @param bucketName
     * @return
     * JSON policy document for bucket, or null if the bucket has no policy.
     */
    public String getBucketPolicy(String bucketName) {
        try {
            return getBucketPolicyImpl(bucketName);
        } catch (CSServiceException e) {
            if (e.getResponseCode() == 404) {
                return null;
            } else {
                throw e;
            }
        }
    }
    
    /**
     * Delete the access control policy document for a bucket.
     *
     * @param bucketName
     */
    public void deleteBucketPolicy(String bucketName) {
        deleteBucketPolicyImpl(bucketName);
    }
    
    /**
     * Convenience method that uploads a file-based object to a storage service using
     * the regular {@link #putObject(String, StorageObject)} mechanism, or as a
     * multipart upload if the object's file data is larger than the given maximum
     * part size parameter.
     *
     * If a multipart upload is performed this method will perform all the necessary
     * steps, including:
     * <ol>
     * <li>Start a new multipart upload process, based on the object's key name,
     *     metadata.</li>
     * <li>Poll the service for a little while to ensure the just-started upload
     *     is actually available for use before proceeding -- this can take some
     *     time, we give up after 5 seconds (with 1 lookup attempt per second)</li>
     * <li>Divide the object's underlying file into parts with size <= the given
     *     maximum part size</li>
     * <li>Upload each of these parts in turn, with part numbers 1..n</li>
     * <li>Complete the upload once all the parts have been uploaded, or...</li>
     * <li>If there was a failure uploading parts or completing the upload, attempt
     *     to clean up by calling {@link #multipartAbortUpload(MultipartUpload)}
     *     then throw the original exception</li>
     * </ol>
     * This means that any multipart upload will involve sending around 2 + n separate
     * HTTP requests, where n is ceil(objectDataSize / maxPartSize).
     *
     * @param bucketName
     * the name of the bucket in which the object will be stored.
     * @param object
     * a file-based object containing all information that will be written to the service.
     * If the object provided is not file-based -- i.e. it returns null from
     * {@link StorageObject#getDataInputFile()} -- an exception will be thrown immediately.
     * @param maxPartSize
     * the maximum size in bytes for any single upload part. If the given object's data is
     * less than this value it will be uploaded using a regular PUT. If the object has more
     * data than this value it will be uploaded using a multipart upload.
     * The maximum part size value should be <= 5 GB and >= 5 MB.
     *
     * @throws ServiceException
     */
    public void putObjectMaybeAsMultipart(String bucketName, StorageObject object,
        long maxPartSize) {
        // Only file-based objects are supported
        if (object.getDataInputFile() == null) {
            throw new ServiceException(
                "multipartUpload method only supports file-based objects");
        }

        MultipartUtils multipartUtils = new MultipartUtils(maxPartSize);

        // Upload object normally if it doesn't exceed maxPartSize
        if (!multipartUtils.isFileLargerThanMaxPartSize(object.getDataInputFile())) {
            log.debug("Performing normal PUT upload for object with data <= " + maxPartSize);
            putObject(bucketName, object);
        } else {
            log.debug("Performing multipart upload for object with data > " + maxPartSize);

            // Start upload
            MultipartUpload upload = multipartStartUpload(bucketName, object.getKey(),
                object.getMetadataMap());

            // Ensure upload is present on service-side, might take a little time
            boolean foundUpload = false;
            int maxTries = 5; // Allow up to 5 lookups for upload before we give up
            int tries = 0;
            do {
                try {
                    multipartListParts(upload);
                    foundUpload = true;
                } catch (CSServiceException e) {
                    if ("NoSuchUpload".equals(e.getErrorCode())) {
                        tries++;
                        try {
                            Thread.sleep(1000); // Wait for a second
                        } catch (InterruptedException ie) {
                            tries = maxTries;
                        }
                    } else {
                        // Bail out if we get a (relatively) unexpected exception
                        throw e;
                    }
                }
            } while (!foundUpload && tries < maxTries);

            if (!foundUpload) {
                throw new ServiceException(
                    "Multipart upload was started but unavailable for use after "
                    + tries + " attempts, giving up");
            }

            // Will attempt to delete multipart upload upon failure.
            try {
                List<CSObject> partObjects = multipartUtils.splitFileIntoObjectsByMaxPartSize(
                    object.getKey(), object.getDataInputFile());

                List<MultipartPart> parts = new ArrayList<MultipartPart>();
                int partNumber = 1;
                for (CSObject partObject: partObjects) {
                	partObject.setLastModifiedDate(object.getLastModifiedDate());
                    MultipartPart part = multipartUploadPart(upload, partNumber, partObject);
                    parts.add(part);
                    partNumber++;
                }

                multipartCompleteUpload(upload, parts);
            } catch (RuntimeException e) {
                throw e;
            } catch (Exception e) {
                // If upload fails for any reason after the upload was started, try to clean up.
                log.warn("Multipart upload failed, attempting clean-up by aborting upload", e);
                try {
                	multipartAbortUpload(upload);
                } catch (CSServiceException e2) {
                    log.warn("Multipart upload failed and could not clean-up by aborting upload", e2);
                }
                // Throw original failure exception
                if (e instanceof ServiceException) {
                    throw (ServiceException) e;
                } else {
                    throw new ServiceException("Multipart upload failed", e);
                }
            }
        }
    }
    
    /**
     * Convenience method that uploads a file-based object to a storage service using
     * the regular {@link #putObject(String, StorageObject)} mechanism, or as a
     * multipart upload if the object's file data is larger than the given maximum
     * part size parameter.
     *
     * If a multipart upload is performed this method will perform all the necessary
     * steps, including:
     * <ol>
     * <li>Start a new multipart upload process, based on the object's key name,
     *     metadata.</li>
     * <li>Poll the service for a little while to ensure the just-started upload
     *     is actually available for use before proceeding -- this can take some
     *     time, we give up after 5 seconds (with 1 lookup attempt per second)</li>
     * <li>Divide the object's underlying file into parts with size <= the given
     *     maximum part size</li>
     * <li>Upload each of these parts in turn, with part numbers 1..n</li>
     * <li>Complete the upload once all the parts have been uploaded, or...</li>
     * <li>If there was a failure uploading parts or completing the upload, attempt
     *     to clean up by calling {@link #multipartAbortUpload(MultipartUpload)}
     *     then throw the original exception</li>
     * </ol>
     * This means that any multipart upload will involve sending around 2 + n separate
     * HTTP requests, where n is ceil(objectDataSize / maxPartSize).
     *
     * @param bucketName
     * the name of the bucket in which the object will be stored.
     * @param object
     * a file-based object containing all information that will be written to the service.
     * If the object provided is not file-based -- i.e. it returns null from
     * {@link StorageObject#getDataInputFile()} -- an exception will be thrown immediately.
     * @param maxPartSize
     * the maximum size in bytes for any single upload part. If the given object's data is
     * less than this value it will be uploaded using a regular PUT. If the object has more
     * data than this value it will be uploaded using a multipart upload.
     * The maximum part size value should be <= 5 GB and >= 5 MB.
     *
     * @throws ServiceException
     */
    public void putObjectAsMultipart(String bucketName, StorageObject object,
        long maxPartSize, long totalSize, FileSystem fs, String hdfsPath) {
        // Only file-based objects are supported
        if (object.getDataInputStream() == null) {
            throw new ServiceException(
                "multipartUpload method should have a objects inputstream to upload");
        }

        MultipartUtils multipartUtils = new MultipartUtils(maxPartSize);

		log.debug("Performing multipart upload for object with data > "
				+ maxPartSize);

		// Start upload
		MultipartUpload upload = multipartStartUpload(bucketName,
				object.getKey(), object.getMetadataMap());

		// Ensure upload is present on service-side, might take a little time
		boolean foundUpload = false;
		int maxTries = 5; // Allow up to 5 lookups for upload before we give up
		int tries = 0;
		do {
			try {
				multipartListParts(upload);
				foundUpload = true;
			} catch (CSServiceException e) {
				if ("NoSuchUpload".equals(e.getErrorCode())) {
					tries++;
					try {
						Thread.sleep(1000); // Wait for a second
					} catch (InterruptedException ie) {
						tries = maxTries;
					}
				} else {
					// Bail out if we get a (relatively) unexpected exception
					throw e;
				}
			}
		} while (!foundUpload && tries < maxTries);

		if (!foundUpload) {
			throw new ServiceException(
					"Multipart upload was started but unavailable for use after "
							+ tries + " attempts, giving up");
		}

		// Will attempt to delete multipart upload upon failure.
		try {
//			List<CSObject> partObjects = multipartUtils
//					.splitFileIntoObjectsByMaxPartSize(object.getKey(),
//							object.getDataInputFile());
			
			List<MultipartPart> parts = new ArrayList<MultipartPart>();
			int partNumber = 1;
			long uploadSize = 0L;
			long totalPartsCount = totalSize / maxPartSize;
			if (totalSize % maxPartSize > 0) {
				totalPartsCount++;
			}
			
			while (partNumber <= totalPartsCount) {
				if (partNumber == totalPartsCount) {
					object.setContentLength(totalSize % maxPartSize);
				} else {
					object.setContentLength(maxPartSize);
				}
				FSDataInputStream fsDataInputStream = fs.open(new Path(hdfsPath));
				fsDataInputStream.seek(uploadSize);
				CSObject partObject = (CSObject) object.clone();
				partObject.setDataInputStream(fsDataInputStream);
				MultipartPart part = multipartUploadPart(upload, partNumber,
						partObject);
				parts.add(part);
				partNumber++;
				uploadSize += object.getContentLength();
			}

			multipartCompleteUpload(upload, parts);
		} catch (RuntimeException e) {
			throw e;
		} catch (Exception e) {
			// If upload fails for any reason after the upload was started, try
			// to clean up.
			log.warn(
					"Multipart upload failed, attempting clean-up by aborting upload",
					e);
			try {
				multipartAbortUpload(upload);
			} catch (CSServiceException e2) {
				log.warn(
						"Multipart upload failed and could not clean-up by aborting upload",
						e2);
			}
			// Throw original failure exception
			if (e instanceof ServiceException) {
				throw (ServiceException) e;
			} else {
				throw new ServiceException("Multipart upload failed", e);
			}
		}
    }
    
    /**
     * Start a multipart upload process for a given object; must be done before
     * individual parts can be uploaded.
     *
     * @param bucketName
     * the name of the bucket in which the object will be stored.
     * @param objectKey
     * the key name of the object.
     * @param metadata
     * metadata to apply to the completed object, may be null.
     * @return
     * object representing this multipart upload.
     */
    public MultipartUpload multipartStartUpload(String bucketName, String objectKey,
        Map<String, Object> metadata) {
        return multipartStartUploadImpl(
                bucketName, objectKey, metadata, null);
    }
    
    /**
     * Abort the given multipart upload process. Also deletes any parts that may
     * have already been uploaded.
     *
     * @param upload
     * the multipart upload to abort.
     */
    public void multipartAbortUpload(MultipartUpload upload) {
        multipartAbortUploadImpl(upload.getUploadId(), upload.getBucketName(), upload.getObjectKey());
    }
    
    /**
     * List the multipart uploads that have been started within a bucket and
     * have not yet been completed or aborted.
     *
     * @param bucketName
     * the bucket whose multipart uploads will be listed.
     * @return
     * a list of incomplete multipart uploads.
     */
    public List<MultipartUpload> multipartListUploads(String bucketName) {
        return multipartListUploads(bucketName, null, null, null, null, null);
    }

    public List<MultipartUpload> multipartListUploads(String bucketName,
            String nextKeyMarker, String nextUploadIdMarker, Integer maxUploads) {
        return multipartListUploads(
            bucketName, null, null, nextKeyMarker, nextUploadIdMarker, maxUploads);
    }
    
    /**
     * List the multipart uploads that have been started within
     * a bucket and have not yet been completed or aborted.
     *
     * @param bucketName
     * the bucket whose multipart uploads will be listed.
     * @param prefix
     * the prefix to use gor the started uploads
     * @param delimiter
     * the delimiter (e.g. '/')
     * @param nextKeyMarker
     * marker indicating where this list subset should start by key name.
     * @param nextUploadIdMarker
     * marker indicating where this list subset should start by upload ID.
     * @param maxUploads
     * maximum number of uploads to retrieve at a time.
     * @return
     * a list of incomplete multipart uploads.
     */
    public List<MultipartUpload> multipartListUploads(String bucketName,
        String prefix, String delimiter,
        String nextKeyMarker, String nextUploadIdMarker, Integer maxUploads) {

        MultipartUploadChunk result = multipartListUploadsChunkedImpl(bucketName,
                prefix,
                delimiter,
                nextKeyMarker,
                nextUploadIdMarker,
                maxUploads,
                true);
        return Arrays.asList(result.getUploads());
    }
    
    /**
     * List all or a subset of the multipart uploads that have been started
     * within a bucket and have not yet been completed or aborted.
     *
     * @param bucketName
     * the bucket whose multipart uploads will be listed.
     * @param prefix
     * the prefix to use gor the started uploads
     * @param delimiter
     * the delimiter (e.g. '/')
     * @param keyMarker
     * marker indicating where this list subset should start by key name.
     * @param uploadIdMarker
     * marker indicating where this list subset should start by upload ID.
     * @param maxUploads
     * maximum number of uploads to retrieve at a time.
     * @param completeListing
     * true to go on listing as long as there are uploads to be retrieved
     * @return
     * a MultipartUploadChunk holding a list of incomplete multipart uploads.
     * @see MultipartUploadChunk
     */
    public MultipartUploadChunk multipartListUploadsChunked(
            String bucketName,
            String prefix,
            String delimiter,
            String keyMarker,
            String uploadIdMarker,
            Integer maxUploads,
            boolean completeListing) {
        return multipartListUploadsChunkedImpl(
                bucketName,
                prefix,
                delimiter,
                keyMarker,
                uploadIdMarker,
                maxUploads,
                completeListing);
    }
    
    /**
     * List the parts that have been uploaded for a given multipart upload.
     *
     * @param upload
     * the multipart upload whose parts will be listed.
     * @return
     * a list of multipart parts that have been successfully uploaded.
     */
    public List<MultipartPart> multipartListParts(MultipartUpload upload) {
        return multipartListPartsImpl(upload.getUploadId(),
            upload.getBucketName(), upload.getObjectKey());
    }
    
    /**
     * Complete a multipart upload by combining all the given parts into
     * the final object.
     *
     * @param upload
     * the multipart upload whose parts will be completed.
     * @param parts
     * the parts comprising the final object.
     * @return
     * information about the completion operation.
     */
    public MultipartCompleted multipartCompleteUpload(MultipartUpload upload,
        List<MultipartPart> parts) {
        return multipartCompleteUploadImpl(upload.getUploadId(), upload.getBucketName(),
            upload.getObjectKey(), parts);
    }

    /**
     * Convenience method to complete a multipart upload by automatically finding
     * its parts. This method does more work than the lower-level
     * {@link #multipartCompleteUpload(MultipartUpload, List)} API operation, but
     * relieves the caller of having to keep track of all the parts uploaded
     * for a multipart upload.
     *
     * @param upload
     * the multipart upload whose parts will be completed.
     * @return
     * information about the completion operation.
     */
    public MultipartCompleted multipartCompleteUpload(MultipartUpload upload) {
        List<MultipartPart> parts = multipartListParts(upload);
        return multipartCompleteUploadImpl(upload.getUploadId(), upload.getBucketName(),
            upload.getObjectKey(), parts);
    }
    
    /**
     * Upload an individual part that will comprise a piece of a multipart upload object.
     *
     * @param upload
     * the multipart upload to which this part will be added.
     * @param partNumber
     * the part's number; must be between 1 and 10,000 and must uniquely identify a given
     * part and represent its order compared to all other parts. Part numbers need not
     * be sequential.
     * @param object
     * an object containing a input stream with data that will be sent to the storage service.
     * @return
     * information about the uploaded part, retain this information to eventually complete
     * the object with {@link #multipartCompleteUpload(MultipartUpload, List)}.
     */
    public MultipartPart multipartUploadPart(MultipartUpload upload, Integer partNumber,
        CSObject object) {
        MultipartPart part = multipartUploadPartImpl(upload.getUploadId(),
            upload.getBucketName(),  partNumber, object);
        upload.addMultipartPartToUploadedList(part);
        return part;
    }
    
    ///////////////////////////////////////////////////////////
    // Abstract methods that must be implemented by Cloud Storage services
    ///////////////////////////////////////////////////////////
    protected abstract String getBucketLocationImpl(String bucketName);
    
    protected abstract void setBucketPolicyImpl(String bucketName, String policyDocument);
    
    protected abstract String getBucketPolicyImpl(String bucketName);
    
    protected abstract void deleteBucketPolicyImpl(String bucketName);
    
    protected abstract MultipartUpload multipartStartUploadImpl(String bucketName, String objectKey,
            Map<String, Object> metadata, String serverSideEncryptionAlgorithm);
    
    protected abstract void multipartAbortUploadImpl(String uploadId, String bucketName,
            String objectKey);
    
    protected abstract MultipartUploadChunk multipartListUploadsChunkedImpl(
            String bucketName,
            String prefix,
            String delimiter,
            String keyMarker,
            String uploadIdMarker,
            Integer maxUploads,
            boolean completeListing);
    
    protected abstract List<MultipartPart> multipartListPartsImpl(String uploadId,
            String bucketName, String objectKey);
    
    protected abstract MultipartCompleted multipartCompleteUploadImpl(String uploadId, String bucketName,
            String objectKey, List<MultipartPart> parts);
    
    protected abstract MultipartPart multipartUploadPartImpl(String uploadId, String bucketName,
            Integer partNumber, CSObject object);
    
}